{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "b0ee9a28",
   "metadata": {},
   "source": [
    "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "3a2a7b51",
   "metadata": {},
   "source": [
    "# 33 - Amazon Neptune"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "42724a76",
   "metadata": {},
   "source": [
    "Note: to be able to use SPARQL you must either install `SPARQLWrapper` or install AWS SDK for pandas with `sparql` extra:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b2fc9329-ac74-4a3e-bf16-ce5b683d1695",
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "!pip install 'awswrangler[gremlin, opencypher, sparql]'"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "19f1f579-a41b-4d4c-b44e-cac0b99d5e46",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "## Initialize\n",
    "\n",
    "The first step to using AWS SDK for pandas with Amazon Neptune is to import the library and create a client connection.\n",
    "\n",
    "<div style=\"background-color:#eeeeee; padding:10px; text-align:left; border-radius:10px; margin-top:10px; margin-bottom:10px; \"><b>Note</b>: Connecting to Amazon Neptune requires that the application you are running has access to the Private VPC where Neptune is located. Without this access you will not be able to connect using AWS SDK for pandas.</div>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fd098b2c",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "\n",
    "import awswrangler as wr\n",
    "\n",
    "url = \"<INSERT CLUSTER ENDPOINT>\"  # The Neptune Cluster endpoint\n",
    "iam_enabled = False  # Set to True/False based on the configuration of your cluster\n",
    "neptune_port = 8182  # Set to the Neptune Cluster Port, Default is 8182\n",
    "client = wr.neptune.connect(url, neptune_port, iam_enabled=iam_enabled)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "1e9499ea",
   "metadata": {},
   "source": [
    "## Return the status of the cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "57903cf4",
   "metadata": {},
   "outputs": [],
   "source": [
    "print(client.status())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "6f13f0cb",
   "metadata": {},
   "source": [
    "## Retrieve Data from Neptune using AWS SDK for pandas\n",
    "\n",
    "AWS SDK for pandas supports querying Amazon Neptune using TinkerPop Gremlin and openCypher for property graph data or SPARQL for RDF data.\n",
    "\n",
    "### Gremlin"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2801f447",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = \"g.E().project('source', 'target').by(outV().id()).by(inV().id()).limit(5)\"\n",
    "df = wr.neptune.execute_gremlin(client, query)\n",
    "display(df.head(5))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "a7666d80",
   "metadata": {},
   "source": [
    "### SPARQL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "91b52363",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = \"\"\"\n",
    "        PREFIX foaf: <https://xmlns.com/foaf/0.1/>\n",
    "        PREFIX ex: <https://www.example.com/>\n",
    "        SELECT ?firstName WHERE { ex:JaneDoe foaf:knows ?person . ?person foaf:firstName ?firstName }\"\"\"\n",
    "df = wr.neptune.execute_sparql(client, query)\n",
    "display(df.head(5))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "367791b9",
   "metadata": {},
   "source": [
    "### openCypher"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ce5df2ee",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = \"MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 5\"\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "display(df.head(5))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "f91b967c",
   "metadata": {},
   "source": [
    "## Saving Data using AWS SDK for pandas\n",
    "\n",
    "AWS SDK for pandas supports saving Pandas DataFrames into Amazon Neptune using either a property graph or RDF data model.\n",
    "\n",
    "### Property Graph\n",
    "\n",
    "If writing to a property graph then DataFrames for vertices and edges must be written separately. DataFrames for vertices must have a `~label` column with the label and a `~id` column for the vertex id.\n",
    "\n",
    "If the `~id` column does not exist, the specified id does not exists, or is empty then a new vertex will be added.\n",
    "\n",
    "If no `~label` column exists then writing to the graph will be treated as an update of the element with the specified `~id` value.\n",
    "\n",
    "DataFrames for edges must have a `~id`, `~label`, `~to`, and `~from` column. If the `~id` column does not exist the specified id does not exists, or is empty then a new edge will be added. If no `~label`, `~to`, or `~from` column exists an exception will be thrown.\n",
    "\n",
    "#### Add Vertices/Nodes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "579fd9c0",
   "metadata": {},
   "outputs": [],
   "source": [
    "import random\n",
    "import string\n",
    "import uuid\n",
    "\n",
    "\n",
    "def _create_dummy_vertex():\n",
    "    data = dict()\n",
    "    data[\"~id\"] = uuid.uuid4()\n",
    "    data[\"~label\"] = \"foo\"\n",
    "    data[\"int\"] = random.randint(0, 1000)\n",
    "    data[\"str\"] = \"\".join(random.choice(string.ascii_lowercase) for i in range(10))\n",
    "    data[\"list\"] = [random.randint(0, 1000), random.randint(0, 1000)]\n",
    "    return data\n",
    "\n",
    "\n",
    "data = [_create_dummy_vertex(), _create_dummy_vertex(), _create_dummy_vertex()]\n",
    "df = pd.DataFrame(data)\n",
    "res = wr.neptune.to_property_graph(client, df)\n",
    "query = f\"MATCH (s) WHERE id(s)='{data[0]['~id']}' RETURN s\"\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "display(df)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "fd5fc8a2",
   "metadata": {},
   "source": [
    "#### Add Edges"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "515f7a0f",
   "metadata": {},
   "outputs": [],
   "source": [
    "import random\n",
    "import string\n",
    "import uuid\n",
    "\n",
    "\n",
    "def _create_dummy_edge():\n",
    "    data = dict()\n",
    "    data[\"~id\"] = uuid.uuid4()\n",
    "    data[\"~label\"] = \"bar\"\n",
    "    data[\"~to\"] = uuid.uuid4()\n",
    "    data[\"~from\"] = uuid.uuid4()\n",
    "    data[\"int\"] = random.randint(0, 1000)\n",
    "    data[\"str\"] = \"\".join(random.choice(string.ascii_lowercase) for i in range(10))\n",
    "    return data\n",
    "\n",
    "\n",
    "data = [_create_dummy_edge(), _create_dummy_edge(), _create_dummy_edge()]\n",
    "df = pd.DataFrame(data)\n",
    "res = wr.neptune.to_property_graph(client, df)\n",
    "query = f\"MATCH (s)-[r]->(d) WHERE id(r)='{data[0]['~id']}' RETURN r\"\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "display(df)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "efe6eaaf",
   "metadata": {},
   "source": [
    "#### Update Existing Nodes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d831c7a3",
   "metadata": {},
   "outputs": [],
   "source": [
    "idval = uuid.uuid4()\n",
    "wr.neptune.execute_gremlin(client, f\"g.addV().property(T.id, '{str(idval)}')\")\n",
    "query = f\"MATCH (s) WHERE id(s)='{idval}' RETURN s\"\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "print(\"Before\")\n",
    "display(df)\n",
    "data = [{\"~id\": idval, \"age\": 50}]\n",
    "df = pd.DataFrame(data)\n",
    "res = wr.neptune.to_property_graph(client, df)\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "print(\"After\")\n",
    "display(df)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "bff6a1fc",
   "metadata": {},
   "source": [
    "#### Setting cardinality based on the header\n",
    "\n",
    " If you would like to save data using `single` cardinality then you can postfix (single) to the column header and\n",
    "    set `use_header_cardinality=True` (default). e.g. A column named `name(single)` will save the `name` property as single cardinality. You can disable this by setting `use_header_cardinality=False`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1010c2f5",
   "metadata": {},
   "outputs": [],
   "source": [
    "data = [_create_dummy_vertex()]\n",
    "df = pd.DataFrame(data)\n",
    "# Adding (single) to the column name in the DataFrame will cause it to write that property as `single` cardinality\n",
    "df.rename(columns={\"int\": \"int(single)\"}, inplace=True)\n",
    "res = wr.neptune.to_property_graph(client, df, use_header_cardinality=True)\n",
    "\n",
    "\n",
    "# This can be disabled by setting `use_header_cardinality = False`\n",
    "df.rename(columns={\"int\": \"int(single)\"}, inplace=True)\n",
    "res = wr.neptune.to_property_graph(client, df, use_header_cardinality=False)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "beca9dab",
   "metadata": {},
   "source": [
    "### RDF\n",
    "\n",
    "The DataFrame must consist of triples with column names for the subject, predicate, and object specified. If none are provided then `s`, `p`, and `o` are the default.\n",
    "\n",
    "If you want to add data into a named graph then you will also need the graph column, default is `g`.\n",
    "\n",
    "#### Write Triples"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1f8427b9",
   "metadata": {},
   "outputs": [],
   "source": [
    "def _create_dummy_triple(s: str = \"foo\"):\n",
    "    return {\n",
    "        \"s\": s,\n",
    "        \"p\": str(uuid.uuid4()),\n",
    "        \"o\": random.randint(0, 1000),\n",
    "    }\n",
    "\n",
    "\n",
    "label = f\"foo_{uuid.uuid4()}\"\n",
    "data = [_create_dummy_triple(label), _create_dummy_triple(label), _create_dummy_triple(label)]\n",
    "df = pd.DataFrame(data)\n",
    "res = wr.neptune.to_rdf_graph(client, df)\n",
    "\n",
    "query = f\"SELECT ?p ?o WHERE {{ <{label}> ?p ?o .}}\"\n",
    "df = wr.neptune.execute_sparql(client, query)\n",
    "display(df)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "b7a45c6a",
   "metadata": {},
   "source": [
    "#### Write Quads"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "819f6a04",
   "metadata": {},
   "outputs": [],
   "source": [
    "def _create_dummy_quad(s: str):\n",
    "    data = _create_dummy_triple(s)\n",
    "    data[\"g\"] = \"bar\"\n",
    "    return data\n",
    "\n",
    "\n",
    "label = f\"foo_{uuid.uuid4()}\"\n",
    "query = f\"SELECT ?p ?o FROM <bar> WHERE {{ <{label}> ?p ?o .}}\"\n",
    "\n",
    "data = [_create_dummy_quad(label), _create_dummy_quad(label), _create_dummy_quad(label)]\n",
    "df = pd.DataFrame(data)\n",
    "res = wr.neptune.to_rdf_graph(client, df)\n",
    "df = wr.neptune.execute_sparql(client, query)\n",
    "display(df)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8370b377",
   "metadata": {},
   "source": [
    "## Flatten DataFrames\n",
    "\n",
    "One of the complexities of working with a row/columns paradigm, such as Pandas, with graph results set is that it is very common for graph results to return complex and nested objects. To help simplify using the results returned from a graph within a more tabular format we have added a method to flatten the returned Pandas DataFrame.\n",
    "\n",
    "### Flattening the DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4488e185",
   "metadata": {},
   "outputs": [],
   "source": [
    "client = wr.neptune.connect(url, 8182, iam_enabled=False)\n",
    "query = \"MATCH (n) RETURN n LIMIT 1\"\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "print(\"Original\")\n",
    "display(df)\n",
    "df_new = wr.neptune.flatten_nested_df(df)\n",
    "print(\"Flattened\")\n",
    "display(df_new)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "9324bff7",
   "metadata": {},
   "source": [
    "### Removing the prefixing of the parent column name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7e95099c",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_new = wr.neptune.flatten_nested_df(df, include_prefix=False)\n",
    "display(df_new)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "21738d39",
   "metadata": {},
   "source": [
    "### Specifying the column header separator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8f4bcbe3",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_new = wr.neptune.flatten_nested_df(df, separator=\"|\")\n",
    "display(df_new)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "1bded05b",
   "metadata": {},
   "source": [
    "## Putting it into a workflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9129f173",
   "metadata": {},
   "outputs": [],
   "source": [
    "pip install igraph networkx"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "cd49d635",
   "metadata": {},
   "source": [
    "### Running PageRank using NetworkX"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ecd88fe2",
   "metadata": {},
   "outputs": [],
   "source": [
    "import networkx as nx\n",
    "\n",
    "# Retrieve Data from neptune\n",
    "client = wr.neptune.connect(url, 8182, iam_enabled=False)\n",
    "query = \"MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 100\"\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "\n",
    "# Run PageRank\n",
    "G = nx.from_pandas_edgelist(df, edge_attr=True)\n",
    "pg = nx.pagerank(G)\n",
    "\n",
    "# Save values back into Neptune\n",
    "rows = []\n",
    "for k in pg.keys():\n",
    "    rows.append({\"~id\": k, \"pageRank_nx(single)\": pg[k]})\n",
    "pg_df = pd.DataFrame(rows, columns=[\"~id\", \"pageRank_nx(single)\"])\n",
    "res = wr.neptune.to_property_graph(client, pg_df, use_header_cardinality=True)\n",
    "\n",
    "# Retrieve newly saved data\n",
    "query = (\n",
    "    \"MATCH (n:airport) WHERE n.pageRank_nx IS NOT NULL RETURN n.code, n.pageRank_nx ORDER BY n.pageRank_nx DESC LIMIT 5\"\n",
    ")\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "display(df)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "783a599e",
   "metadata": {},
   "source": [
    "### Running PageRank using iGraph"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "abb8c7ab",
   "metadata": {},
   "outputs": [],
   "source": [
    "import igraph as ig\n",
    "\n",
    "# Retrieve Data from neptune\n",
    "client = wr.neptune.connect(url, 8182, iam_enabled=False)\n",
    "query = \"MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 100\"\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "\n",
    "# Run PageRank\n",
    "g = ig.Graph.TupleList(df.itertuples(index=False), directed=True, weights=False)\n",
    "pg = g.pagerank()\n",
    "\n",
    "# Save values back into Neptune\n",
    "rows = []\n",
    "for idx, v in enumerate(g.vs):\n",
    "    rows.append({\"~id\": v[\"name\"], \"pageRank_ig(single)\": pg[idx]})\n",
    "pg_df = pd.DataFrame(rows, columns=[\"~id\", \"pageRank_ig(single)\"])\n",
    "res = wr.neptune.to_property_graph(client, pg_df, use_header_cardinality=True)\n",
    "\n",
    "# Retrieve newly saved data\n",
    "query = (\n",
    "    \"MATCH (n:airport) WHERE n.pageRank_ig IS NOT NULL RETURN n.code, n.pageRank_ig ORDER BY n.pageRank_ig DESC LIMIT 5\"\n",
    ")\n",
    "df = wr.neptune.execute_opencypher(client, query)\n",
    "display(df)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "19a2ae67",
   "metadata": {},
   "source": [
    "## Bulk Load"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "86d1bca1",
   "metadata": {},
   "source": [
    "Data can be written using the Neptune Bulk Loader by way of S3.\n",
    "The Bulk Loader is fast and optimized for large datasets.\n",
    "\n",
    "For details on the IAM permissions needed to set this up, see [here](https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load.html)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3f3aa82f",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.DataFrame([_create_dummy_edge() for _ in range(1000)])\n",
    "\n",
    "wr.neptune.bulk_load(\n",
    "    client=client,\n",
    "    df=df,\n",
    "    path=\"s3://my-bucket/stage-files/\",\n",
    "    iam_role=\"arn:aws:iam::XXX:role/XXX\",\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e00bc8a5",
   "metadata": {},
   "source": [
    "Alternatively, if the data is already on S3 in CSV format, you can use the `neptune.bulk_load_from_files` function.\n",
    "This is also useful if the data is written to S3 as a byproduct of an AWS Athena command, as the example below will show."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a5263211",
   "metadata": {},
   "outputs": [],
   "source": [
    "sql = \"\"\"\n",
    "SELECT\n",
    "    <col_id> AS \"~id\"\n",
    "  , <label_id> AS \"~label\"\n",
    "  , *\n",
    "FROM <database>.<table>\n",
    "\"\"\"\n",
    "\n",
    "wr.athena.start_query_execution(\n",
    "    sql=sql,\n",
    "    s3_output=\"s3://my-bucket/stage-files-athena/\",\n",
    "    wait=True,\n",
    ")\n",
    "\n",
    "wr.neptune.bulk_load_from_files(\n",
    "    client=client,\n",
    "    path=\"s3://my-bucket/stage-files-athena/\",\n",
    "    iam_role=\"arn:aws:iam::XXX:role/XXX\",\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "58ee6866",
   "metadata": {},
   "source": [
    "Both the `bulk_load` and `bulk_load_from_files` functions are suitable at scale.\n",
    "The latter simply invokes the Neptune Bulk Loader on existing data in S3.\n",
    "The former, however, involves writing CSV data to S3. With `ray` and `modin` installed, this operation can also be distributed across multiple workers in a Ray cluster."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "awswrangler-v9JnknIF-py3.8",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5 (default, Apr 13 2022, 19:37:23) \n[Clang 13.0.0 (clang-1300.0.27.3)]"
  },
  "vscode": {
   "interpreter": {
    "hash": "83297b058d59ee0acd247586c837429190a8258f15c0eea6234359f5557dde51"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
